/**
 * 
 */
package cn.x6game.socket;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;

import com.app.framework.socket.Packet;

/**
 * @author lisong
 * @version 2013-2-26 下午2:41:32
 */
public class SocketClient {

	private SocketClient() {

	}
	static {
		PropertyConfigurator.configure(System.getProperty("user.dir") + "/log4j.properties");//指定log4j的位置
	}
	
	static Logger logger = Logger.getLogger(SocketClient.class);
	static AtomicLong ato = new AtomicLong();
	static AtomicLong sendtimes = new AtomicLong();
	static ClientBootstrap bootstrap;
	static AtomicBoolean stop = new AtomicBoolean(true);
	public static void start() throws Exception {

		bootstrap = new ClientBootstrap(
				new NioClientSocketChannelFactory(
						Executors.newCachedThreadPool(),
						Executors.newCachedThreadPool()));

		OrderedMemoryAwareThreadPoolExecutor executor = new OrderedMemoryAwareThreadPoolExecutor(100, 0, 0);
//		ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
		SocketPipelineFactory factory = new SocketPipelineFactory(executor);

		bootstrap.setPipelineFactory(factory);

		bootstrap.setOption("child.tcpNoDelay", true);// child的属性  是与客户端建立连接的socket的,非child的是服务器负责监听客户端连接的主serversocket的
//		bootstrap.setOption("child.writeBufferHighWaterMark", 5000 * 60 * 1024);
//		bootstrap.setOption("child.writeBufferLowWaterMark", 0);
//		bootstrap.setOption("child.receiveBufferSize", 1048576);
		// Setting this option to a value such as 200, 500 or 1000, tells the
		// TCP stack how long the "accept" queue can be.
		// If this option is not configured, then the backlog depends on OS
		// setting.

		// c.getConfig().setBufferFactory(HeapChannelBufferFactory.getInstance(ByteOrder.LITTLE_ENDIAN));//默认大头,在此可设置
	}
	
	public static Channel connect() {
		ChannelFuture cf = bootstrap.connect(new InetSocketAddress("localhost",9999));
		cf.awaitUninterruptibly();
		Channel channel = cf.getChannel();
		return channel;
	}
	public static CountDownLatch cd = new CountDownLatch(10);
	
	public static void main(String[] args) throws Exception {
		SocketClient.start();
		Runnable r = new Runnable() {
			@Override
			public void run() {
				
				List<Channel> lst = new ArrayList<Channel>();
				
				for(int i=0;i<1;i++) {
					Channel c1 = connect();
					Packet p1 = new Packet((short) 3, String.valueOf(ato.incrementAndGet()).getBytes());
					c1.write(p1).awaitUninterruptibly();
					lst.add(c1);
				}
				cd.countDown();
				try {
					cd.await();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				while(true) {
					try {
						for(Channel c : lst) {
							if(c.isWritable()) {
								Packet p3 = new Packet((short) 2, String.valueOf(ato.incrementAndGet()).getBytes());
								c.write(p3);
								if(sendtimes.addAndGet(1) % 10000 == 0) {
									System.out.println("readable:"+c.isReadable()+",write......sendtime:"+sendtimes.get());
								}
							}
							else {
								synchronized (c.getAttachment()) {
									System.out.println("unwriteable wait......."+ato.get());
									c.getAttachment().wait();
								}
							}
						}
						if(stop.get()) {
							for(Channel c : lst) {
								Packet p3 = new Packet((short) 2, ("stop"+ato.get()).getBytes());
								c.write(p3);
							}
							synchronized (cd) {
								System.out.println("stop......."+ato.get());
								cd.wait();
							}
						}	
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			}
		};
		List<Thread> lst = new ArrayList<Thread>();
		for(int i=0;i<10;i++) {
			Thread t = new Thread(r);
			t.start();
			lst.add(t);
		}
		Thread thread = new Thread(new Runnable() {
			@Override
			public void run() {
				BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
				try {
					while(true) {
						if(br.ready()) {
							String str = br.readLine();
							try {
								stop.set(Integer.parseInt(str) == 1);
							} catch (Exception e) {
								stop.set(false);
							}
							
							if(!stop.get()) {
								synchronized (cd) {
									cd.notifyAll();
									System.out.println("runing...........");
								}
							}
						}
						Thread.sleep(1000);
					}
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		});
		thread.start();
		lst.add(thread);
		for(int i=0;i<lst.size();i++) {
			lst.get(i).join();
		}
		System.out.println("client start");
	}
}
